Fork me on GitHub

grpc-go 客户端invoke调用源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
conn, err := grpc.Dial("127.0.0.1:1235", grpc.WithInsecure())
defer conn.Close()
if err != nil{
log.Fatal(err)
}

client := server_hello_proto.NewTigerServiceClient(conn)
response, err := client.HelloTiger(context.Background(), &server_hello_proto.HelloRequest{
Name: "ban",
Age: 11,
})
if err != nil{
log.Fatal(err)
}
log.Println(response)
}

上一篇我们分析了Dial建立连接的流程。现在继续往下分析。

1
client := server_hello_proto.NewTigerServiceClient(conn)

这里是使用pb文件的代码去新建一个连接client,其实也就是使用一个结构体类型(TigerServiceClient)的值保存clientConn连接属性。

1
2
3
type tigerServiceClient struct {
cc *grpc.ClientConn
}

这个TigerServiceClient类型实现了TigerServiceClient接口,也就是实现了根据proto定义的rppc service 接口编译自动生成的客户端stub api接口。

1
2
3
4
type TigerServiceClient interface {
HelloTiger(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error)
...
}

HelloTiger这个方法入参有三个,查看grpc.CallOption这个参数类型,这是一个接口类型

1
2
3
4
5
6
7
8
9
10
11
// CallOption configures a Call before it starts or extracts information from
// a Call after it completes.
type CallOption interface {
// before is called before the call is sent to any server. If before
// returns a non-nil error, the RPC fails with that error.
before(*callInfo) error

// after is called after the call has completed. after cannot return an
// error, so any failures should be reported via output parameters.
after(*callInfo)
}

这个接口类型有before和after方法,我们可以实现这个接口,在RPC方法调用前后会调用before和after方法去执行我们的实现逻辑。

然后我们再看客户端stub api的方法实现

1
2
3
4
5
6
7
8
func (c *tigerServiceClient) HelloTiger(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error) {
out := new(HelloResponse)
err := c.cc.Invoke(ctx, "/server_hello_proto.TigerService/HelloTiger", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

这里的核心是Invoke调用,也就是本篇分析的重点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

// Invoke sends the RPC request on the wire and returns after response is
// received. This is typically called by generated code.
// Invoke 在逻辑链接ClientCoon上发送RPC请求,收到响应后返回。Invoke通常由proto代码生成工具生成的stub代码发起调用
//
// All errors returned by Invoke are compatible with the status package.
//所有Invoke调用返回的错误都与 status 包兼容
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
//将Dial初始化的callOption与Invoke调用传入的callOption合并到一个切片,允许拦截器去访问这些调用可选配置callOptions
opts = combine(cc.dopts.callOptions, opts)


if cc.dopts.unaryInt != nil {
//拦截器不为空,就调用这个方法(实际上也要调用invoke函数)
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
//拦截器为空,直接调用invoke函数
return invoke(ctx, method, args, reply, cc, opts...)
}

invoke函数声明

1
2
3
4
5
6
7
8
9
10
11
12
13
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
//获取传输层 Trasport 并组合封装到 ClientStream 中返回,在这块会涉及负载均衡、超时控制、 Encoding、 Stream 的动作,与服务端基本一致的行为。
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
//发送请求
if err := cs.SendMsg(req); err != nil {
return err
}
//返回响应
return cs.RecvMsg(reply)
}

创建客户端流对象时,会循环调用callOption的before方法,做一些发送请求前的处理

1
2
3
4
5
for _, o := range opts {
if err := o.before(c); err != nil {
return nil, toRPCErr(err)
}
}

发送请求SendMsg方法声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

func (cs *clientStream) SendMsg(m interface{}) (err error) {
defer func() {
if err != nil && err != io.EOF {
// Call finish on the client stream for errors generated by this SendMsg
// call, as these indicate problems created by this client. (Transport
// errors are converted to an io.EOF error in csAttempt.sendMsg; the real
// error will be returned from RecvMsg eventually in that case, or be
// retried.)
cs.finish(err)
}
}()
// 判断客户端流是否已关闭
if cs.sentLast {
return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
}
// 判断当前流是否是客户端流
if !cs.desc.ClientStreams {
cs.sentLast = true
}

//对请求信息预处理,序列化、压缩,生成
// load hdr, payload, data
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
if err != nil {
return err
}

// TODO(dfawley): should we be checking len(data) instead?
//判断压缩+序列化后的消息体总字节长度是否大于预设的 maxSendMessageSize(预设值为 math.MaxInt32),若超出则提示错误
if len(payload) > *cs.callInfo.maxSendMessageSize {
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
}
msgBytes := data // Store the pointer before setting to nil. For binary logging.
// 创建发送消息的函数
op := func(a *csAttempt) error {
//真正发送数据的地方
err := a.sendMsg(m, hdr, payload, data)
// nil out the message and uncomp when replaying; they are only needed for
// stats which is disabled for subsequent attempts.
m, data = nil, nil
return err
}
// 开始发送(带重试机制)
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
if cs.binlog != nil && err == nil {
cs.binlog.Log(&binarylog.ClientMessage{
OnClientSide: true,
Message: msgBytes,
})
}
return
}

客户端流对象接收响应信息方法声明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (cs *clientStream) RecvMsg(m interface{}) error {
if cs.binlog != nil && !cs.serverHeaderBinlogged {
// Call Header() to binary log header if it's not already logged.
cs.Header()
}
var recvInfo *payloadInfo
if cs.binlog != nil {
recvInfo = &payloadInfo{}
}
//接收服务端结果,并且反序列化,填充到m对象上,m就是返回值
err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo)
}, cs.commitAttemptLocked)
if cs.binlog != nil && err == nil {
cs.binlog.Log(&binarylog.ServerMessage{
OnClientSide: true,
Message: recvInfo.uncompressedBytes,
})
}
if err != nil || !cs.desc.ServerStreams {
// err != nil or non-server-streaming indicates end of stream.
cs.finish(err)

if cs.binlog != nil {
// finish will not log Trailer. Log Trailer here.
logEntry := &binarylog.ServerTrailer{
OnClientSide: true,
Trailer: cs.Trailer(),
Err: err,
}
if logEntry.Err == io.EOF {
logEntry.Err = nil
}
if peer, ok := peer.FromContext(cs.Context()); ok {
logEntry.PeerAddr = peer.Addr
}
cs.binlog.Log(logEntry)
}
}
return err
}